/*
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
 * agreements. See the NOTICE file distributed with this work for additional information regarding
 * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License. You may obtain a
 * copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */
package org.apache.geode.cache30;

import static java.lang.Runtime.getRuntime;
import static java.lang.System.currentTimeMillis;
import static org.apache.geode.cache.DataPolicy.REPLICATE;
import static org.apache.geode.cache.Scope.DISTRIBUTED_NO_ACK;
import static org.apache.geode.distributed.ConfigurationProperties.ASYNC_DISTRIBUTION_TIMEOUT;
import static org.apache.geode.distributed.ConfigurationProperties.ASYNC_MAX_QUEUE_SIZE;
import static org.apache.geode.distributed.ConfigurationProperties.ASYNC_QUEUE_TIMEOUT;
import static org.apache.geode.internal.tcp.Connection.FORCE_ASYNC_QUEUE;
import static org.apache.geode.test.dunit.LogWriterUtils.getLogWriter;
import static org.apache.geode.test.dunit.Wait.pause;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.Serializable;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
import java.util.Set;

import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.Region.Entry;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.LogWriterUtils;
import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.Wait;
import org.apache.geode.test.dunit.WaitCriterion;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.junit.categories.MembershipTest;

/**
 * Test to make sure slow receiver queuing is working
 *
 * @since GemFire 4.2.1
 */
@Category({MembershipTest.class})
@Ignore("Test was disabled by renaming to DisabledTest")
public class SlowRecDUnitTest extends JUnit4CacheTestCase {

  protected static Object lastCallback = null;

  // this test has special config of its distributed system so
  // the setUp and tearDown methods need to make sure we don't
  // use the ds from previous test and that we don't leave ours around
  // for the next test to use.

  @Override
  public final void preSetUp() throws Exception {
    disconnectAllFromDS();
  }

  @Override
  public final void postTearDownCacheTestCase() throws Exception {
    disconnectAllFromDS();
  }

  private VM getOtherVm() {
    Host host = Host.getHost(0);
    return host.getVM(0);
  }

  private void doCreateOtherVm(final Properties p, final boolean addListener) {
    VM vm = getOtherVm();
    vm.invoke(new CacheSerializableRunnable("create root") {
      @Override
      public void run2() throws CacheException {
        getSystem(p);
        createAckRegion(true, false);
        AttributesFactory af = new AttributesFactory();
        af.setScope(Scope.DISTRIBUTED_NO_ACK);
        af.setDataPolicy(DataPolicy.REPLICATE);
        if (addListener) {
          CacheListener cl = new CacheListenerAdapter() {
            @Override
            public void afterUpdate(EntryEvent event) {
              // make the slow receiver event slower!
              try {
                Thread.sleep(500);
              } catch (InterruptedException shuttingDown) {
                fail("interrupted");
              }
            }
          };
          af.setCacheListener(cl);
        } else {
          CacheListener cl = new CacheListenerAdapter() {
            @Override
            public void afterCreate(EntryEvent event) {
              if (event.getCallbackArgument() != null) {
                lastCallback = event.getCallbackArgument();
              }
              if (event.getKey().equals("sleepkey")) {
                int sleepMs = ((Integer) event.getNewValue()).intValue();
                try {
                  Thread.sleep(sleepMs);
                } catch (InterruptedException ignore) {
                  fail("interrupted");
                }
              }
            }

            @Override
            public void afterUpdate(EntryEvent event) {
              if (event.getCallbackArgument() != null) {
                lastCallback = event.getCallbackArgument();
              }
              if (event.getKey().equals("sleepkey")) {
                int sleepMs = ((Integer) event.getNewValue()).intValue();
                try {
                  Thread.sleep(sleepMs);
                } catch (InterruptedException ignore) {
                  fail("interrupted");
                }
              }
            }

            @Override
            public void afterInvalidate(EntryEvent event) {
              if (event.getCallbackArgument() != null) {
                lastCallback = event.getCallbackArgument();
              }
            }

            @Override
            public void afterDestroy(EntryEvent event) {
              if (event.getCallbackArgument() != null) {
                lastCallback = event.getCallbackArgument();
              }
            }
          };
          af.setCacheListener(cl);
        }
        Region r1 = createRootRegion("slowrec", af.create());
        // place holder so we receive updates
        r1.create("key", "value");
      }
    });
  }

  protected static final String CHECK_INVALID = "CHECK_INVALID";

  private void checkLastValueInOtherVm(final String lastValue, final Object lcb) {
    VM vm = getOtherVm();
    vm.invoke(new CacheSerializableRunnable("check last value") {
      @Override
      public void run2() throws CacheException {
        Region r1 = getRootRegion("slowrec");
        if (lcb != null) {
          WaitCriterion ev = new WaitCriterion() {
            @Override
            public boolean done() {
              return lcb.equals(lastCallback);
            }

            @Override
            public String description() {
              return "waiting for callback";
            }
          };
          GeodeAwaitility.await().untilAsserted(ev);
          assertEquals(lcb, lastCallback);
        }
        if (lastValue == null) {
          final Region r = r1;
          WaitCriterion ev = new WaitCriterion() {
            @Override
            public boolean done() {
              return r.getEntry("key") == null;
            }

            @Override
            public String description() {
              return "waiting for key to become null";
            }
          };
          GeodeAwaitility.await().untilAsserted(ev);
          assertEquals(null, r1.getEntry("key"));
        } else if (CHECK_INVALID.equals(lastValue)) {
          // should be invalid
          {
            final Region r = r1;
            WaitCriterion ev = new WaitCriterion() {
              @Override
              public boolean done() {
                Entry e = r.getEntry("key");
                if (e == null) {
                  return false;
                }
                return e.getValue() == null;
              }

              @Override
              public String description() {
                return "waiting for invalidate";
              }
            };
            GeodeAwaitility.await().untilAsserted(ev);
          }
        } else {
          {
            int retryCount = 1000;
            Region.Entry re = null;
            Object value = null;
            while (retryCount-- > 0) {
              re = r1.getEntry("key");
              if (re != null) {
                value = re.getValue();
                if (value != null && value.equals(lastValue)) {
                  break;
                }
              }
              try {
                Thread.sleep(50);
              } catch (InterruptedException ignore) {
                fail("interrupted");
              }
            }
            assertNotNull(re);
            assertNotNull(value);
            assertEquals(lastValue, value);
          }
        }
      }
    });
  }

  private void forceQueueFlush() {
    FORCE_ASYNC_QUEUE = false;
    final DMStats stats = getSystem().getDistributionManager().getStats();
    WaitCriterion ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return stats.getAsyncThreads() == 0;
      }

      @Override
      public String description() {
        return "Waiting for async threads to disappear";
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);
  }

  private void forceQueuing(final Region r) throws CacheException {
    FORCE_ASYNC_QUEUE = true;
    final DMStats stats = getSystem().getDistributionManager().getStats();
    r.put("forcekey", "forcevalue");

    // wait for the flusher to get its first flush in progress
    WaitCriterion ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return stats.getAsyncQueueFlushesInProgress() != 0;
      }

      @Override
      public String description() {
        return "waiting for flushes to start";
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);
  }

  /**
   * Make sure that noack puts to a receiver will eventually queue and then catch up.
   */
  @Test
  public void testNoAck() throws Exception {
    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
    final Region r = createRootRegion("slowrec", factory.create());
    final DMStats stats = getSystem().getDistributionManager().getStats();

    // create receiver in vm0 with queuing enabled
    Properties p = new Properties();
    p.setProperty(ASYNC_DISTRIBUTION_TIMEOUT, "1");
    doCreateOtherVm(p, false);

    int repeatCount = 2;
    int count = 0;
    while (repeatCount-- > 0) {
      forceQueuing(r);
      final Object key = "key";
      long queuedMsgs = stats.getAsyncQueuedMsgs();
      long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
      long queueSize = stats.getAsyncQueueSize();
      String lastValue = "";
      final long intialQueuedMsgs = queuedMsgs;
      long curQueuedMsgs = queuedMsgs - dequeuedMsgs;
      try {
        // loop while we still have queued the initially queued msgs
        // OR the cur # of queued msgs < 6
        while (dequeuedMsgs < intialQueuedMsgs || curQueuedMsgs <= 6) {
          String value = "count=" + count;
          lastValue = value;
          r.put(key, value);
          count++;
          queueSize = stats.getAsyncQueueSize();
          queuedMsgs = stats.getAsyncQueuedMsgs();
          dequeuedMsgs = stats.getAsyncDequeuedMsgs();
          curQueuedMsgs = queuedMsgs - dequeuedMsgs;
        }
        getLogWriter()
            .info("After " + count + " " + " puts slowrec mode kicked in by queuing " + queuedMsgs
                + " for a total size of " + queueSize);
      } finally {
        forceQueueFlush();
      }
      WaitCriterion ev = new WaitCriterion() {
        @Override
        public boolean done() {
          return stats.getAsyncQueueSize() == 0;
        }

        @Override
        public String description() {
          return "Waiting for queues to empty";
        }
      };
      final long start = currentTimeMillis();
      GeodeAwaitility.await().untilAsserted(ev);
      final long finish = currentTimeMillis();
      getLogWriter()
          .info("After " + (finish - start) + " ms async msgs where flushed. A total of "
              + stats.getAsyncDequeuedMsgs() + " were flushed. lastValue=" + lastValue);

      checkLastValueInOtherVm(lastValue, null);
    }
  }

  /**
   * Create a region named AckRegion with ACK scope
   */
  protected Region createAckRegion(boolean mirror, boolean conflate) throws CacheException {
    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_ACK);
    if (mirror) {
      factory.setDataPolicy(DataPolicy.REPLICATE);
    }
    if (conflate) {
      factory.setEnableAsyncConflation(true);
    }
    final Region r = createRootRegion("AckRegion", factory.create());
    return r;
  }

  /**
   * Make sure that noack puts to a receiver will eventually queue and then catch up with conflation
   */
  @Test
  public void testNoAckConflation() throws Exception {
    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
    factory.setEnableAsyncConflation(true);
    final Region r = createRootRegion("slowrec", factory.create());
    final DMStats stats = getSystem().getDistributionManager().getStats();

    // create receiver in vm0 with queuing enabled
    Properties p = new Properties();
    p.setProperty(ASYNC_DISTRIBUTION_TIMEOUT, "1");
    doCreateOtherVm(p, false);

    forceQueuing(r);
    final Object key = "key";
    int count = 0;
    final long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
    String lastValue = "";
    final long intialDeQueuedMsgs = stats.getAsyncDequeuedMsgs();
    long start = 0;
    try {
      while ((stats.getAsyncConflatedMsgs() - initialConflatedMsgs) < 1000) {
        String value = "count=" + count;
        lastValue = value;
        r.put(key, value);
        count++;
      }
      start = System.currentTimeMillis();
    } finally {
      forceQueueFlush();
    }
    final long finish = System.currentTimeMillis();
    LogWriterUtils.getLogWriter()
        .info("After " + (finish - start) + " ms async msgs where flushed. A total of "
            + (stats.getAsyncDequeuedMsgs() - intialDeQueuedMsgs)
            + " were flushed. Leaving a queue size of " + stats.getAsyncQueueSize()
            + ". The lastValue was " + lastValue);

    checkLastValueInOtherVm(lastValue, null);
  }

  /**
   * make sure ack does not hang make sure two ack updates do not conflate but are both queued
   */
  @Test
  public void testAckConflation() throws Exception {
    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
    factory.setEnableAsyncConflation(true);
    final Region r = createRootRegion("slowrec", factory.create());
    final Region ar = createAckRegion(false, true);
    ar.create("ackKey", "ackValue");

    final DMStats stats = getSystem().getDistributionManager().getStats();

    // create receiver in vm0 with queuing enabled
    Properties p = new Properties();
    p.setProperty(ASYNC_DISTRIBUTION_TIMEOUT, "2");
    doCreateOtherVm(p, false);

    forceQueuing(r);
    {
      // make sure ack does not hang
      // make sure two ack updates do not conflate but are both queued
      long startQueuedMsgs = stats.getAsyncQueuedMsgs();
      long startConflatedMsgs = stats.getAsyncConflatedMsgs();
      Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
          ar.put("ackKey", "ackValue");
        }
      });
      t.start();
      Thread t2 = new Thread(new Runnable() {
        @Override
        public void run() {
          ar.put("ackKey", "ackValue");
        }
      });
      t2.start();
      // give threads a chance to get queued
      try {
        Thread.sleep(100);
      } catch (InterruptedException ignore) {
        fail("interrupted");
      }
      forceQueueFlush();
      ThreadUtils.join(t, 2 * 1000);
      ThreadUtils.join(t2, 2 * 1000);
      long endQueuedMsgs = stats.getAsyncQueuedMsgs();
      long endConflatedMsgs = stats.getAsyncConflatedMsgs();
      assertEquals(startConflatedMsgs, endConflatedMsgs);
      // queue should be flushed by the time we get an ack
      assertEquals(endQueuedMsgs, stats.getAsyncDequeuedMsgs());
      assertEquals(startQueuedMsgs + 2, endQueuedMsgs);
    }
  }

  /**
   * Make sure that only sequences of updates are conflated Also checks that sending to a conflating
   * region and non-conflating region does the correct thing. Test disabled because it
   * intermittently fails due to race conditions in test. This has been fixed in congo's tests. See
   * bug 35357.
   */
  @Test
  public void testConflationSequence() throws Exception {
    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
    factory.setEnableAsyncConflation(true);
    final Region r = createRootRegion("slowrec", factory.create());
    factory.setEnableAsyncConflation(false);
    final Region noConflate = createRootRegion("noConflate", factory.create());
    final DMStats stats = getSystem().getDistributionManager().getStats();

    // create receiver in vm0 with queuing enabled
    Properties p = new Properties();
    p.setProperty(ASYNC_DISTRIBUTION_TIMEOUT, "1");
    doCreateOtherVm(p, false);
    {
      VM vm = getOtherVm();
      vm.invoke(new CacheSerializableRunnable("create noConflate") {
        @Override
        public void run2() throws CacheException {
          AttributesFactory af = new AttributesFactory();
          af.setScope(Scope.DISTRIBUTED_NO_ACK);
          af.setDataPolicy(DataPolicy.REPLICATE);
          createRootRegion("noConflate", af.create());
        }
      });
    }

    // now make sure update+destroy does not conflate
    final Object key = "key";
    LogWriterUtils.getLogWriter().info("[testConflationSequence] about to force queuing");
    forceQueuing(r);

    int count = 0;
    String value = "";
    String lastValue = value;
    Object mylcb = null;
    long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
    int endCount = count + 60;

    LogWriterUtils.getLogWriter().info("[testConflationSequence] about to build up queue");
    long begin = System.currentTimeMillis();
    while (count < endCount) {
      value = "count=" + count;
      lastValue = value;
      r.create(key, value);
      count++;
      value = "count=" + count;
      lastValue = value;
      r.put(key, value);
      count++;
      mylcb = value;
      r.destroy(key, mylcb);
      count++;
      lastValue = null;
      assertTrue(System.currentTimeMillis() < begin + 1000 * 60 * 2);
    }
    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
    forceQueueFlush();
    checkLastValueInOtherVm(lastValue, mylcb);

    // now make sure create+update+localDestroy does not conflate
    LogWriterUtils.getLogWriter()
        .info("[testConflationSequence] force queuing create-update-destroy");
    forceQueuing(r);
    initialConflatedMsgs = stats.getAsyncConflatedMsgs();
    endCount = count + 40;

    LogWriterUtils.getLogWriter().info("[testConflationSequence] create-update-destroy");
    begin = System.currentTimeMillis();
    while (count < endCount) {
      value = "count=" + count;
      lastValue = value;
      r.create(key, value);
      count++;
      value = "count=" + count;
      lastValue = value;
      r.put(key, value);
      count++;
      r.localDestroy(key);
      assertTrue(System.currentTimeMillis() < begin + 1000 * 60 * 2);
    }
    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
    forceQueueFlush();
    checkLastValueInOtherVm(lastValue, null);

    // now make sure update+invalidate does not conflate
    LogWriterUtils.getLogWriter().info("[testConflationSequence] force queuing update-invalidate");
    forceQueuing(r);
    initialConflatedMsgs = stats.getAsyncConflatedMsgs();
    value = "count=" + count;
    lastValue = value;
    r.create(key, value);
    count++;
    endCount = count + 40;

    LogWriterUtils.getLogWriter().info("[testConflationSequence] update-invalidate");
    begin = System.currentTimeMillis();
    while (count < endCount) {
      value = "count=" + count;
      lastValue = value;
      r.put(key, value);
      count++;
      r.invalidate(key);
      count++;
      lastValue = CHECK_INVALID;
      assertTrue(System.currentTimeMillis() < begin + 1000 * 60 * 2);
    }
    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
    forceQueueFlush();
    LogWriterUtils.getLogWriter().info("[testConflationSequence] assert other vm");
    checkLastValueInOtherVm(lastValue, null);

    r.destroy(key);

    // now make sure updates to a conflating region are conflated even while
    // updates to a non-conflating are not.
    LogWriterUtils.getLogWriter().info("[testConflationSequence] conflate & no-conflate regions");
    forceQueuing(r);
    final int initialAsyncSocketWrites = stats.getAsyncSocketWrites();

    value = "count=" + count;
    lastValue = value;
    long conflatedMsgs = stats.getAsyncConflatedMsgs();
    long queuedMsgs = stats.getAsyncQueuedMsgs();
    r.create(key, value);
    queuedMsgs++;
    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
    r.put(key, value);
    queuedMsgs++;
    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
    noConflate.create(key, value);
    queuedMsgs++;
    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
    noConflate.put(key, value);
    queuedMsgs++;
    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
    count++;
    endCount = count + 80;

    begin = System.currentTimeMillis();
    LogWriterUtils.getLogWriter()
        .info("[testConflationSequence:DEBUG] count=" + count + " queuedMsgs="
            + stats.getAsyncQueuedMsgs() + " conflatedMsgs=" + stats.getAsyncConflatedMsgs()
            + " dequeuedMsgs=" + stats.getAsyncDequeuedMsgs() + " asyncSocketWrites="
            + stats.getAsyncSocketWrites());
    while (count < endCount) {
      // make sure we continue to have a flush in progress
      assertEquals(1, stats.getAsyncThreads());
      assertEquals(1, stats.getAsyncQueues());
      assertTrue(stats.getAsyncQueueFlushesInProgress() > 0);
      // make sure we are not completing any flushing while this loop is in progress
      assertEquals(initialAsyncSocketWrites, stats.getAsyncSocketWrites());
      value = "count=" + count;
      lastValue = value;
      r.put(key, value);
      count++;
      // make sure it was conflated and not queued
      assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
      conflatedMsgs++;
      assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
      noConflate.put(key, value);
      // make sure it was queued and not conflated
      queuedMsgs++;
      assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
      assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
      assertTrue(System.currentTimeMillis() < begin + 1000 * 60 * 2);
    }

    forceQueueFlush();
    LogWriterUtils.getLogWriter().info("[testConflationSequence] assert other vm");
    checkLastValueInOtherVm(lastValue, null);
  }

  /**
   * Make sure that exceeding the queue size limit causes a disconnect.
   */
  @Test
  public void testSizeDisconnect() throws Exception {
    final String expected =
        "org.apache.geode.internal.tcp.ConnectionException: Forced disconnect sent to"
            + "||java.io.IOException: Broken pipe";
    final String addExpected = "<ExpectedException action=add>" + expected + "</ExpectedException>";
    final String removeExpected =
        "<ExpectedException action=remove>" + expected + "</ExpectedException>";

    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
    final Region r = createRootRegion("slowrec", factory.create());
    final DistributionManager dm = getSystem().getDistributionManager();
    final DMStats stats = dm.getStats();
    // set others before vm0 connects
    final Set others = dm.getOtherDistributionManagerIds();

    // create receiver in vm0 with queuing enabled
    Properties p = new Properties();
    p.setProperty(ASYNC_DISTRIBUTION_TIMEOUT, "5");
    p.setProperty(ASYNC_MAX_QUEUE_SIZE, "1"); // 1 meg
    doCreateOtherVm(p, false);

    final Object key = "key";
    final int VALUE_SIZE = 1024 * 100; // .1M async-max-queue-size should give us 10 of these 100K
                                       // msgs before queue full
    final byte[] value = new byte[VALUE_SIZE];
    int count = 0;
    forceQueuing(r);
    long queuedMsgs = stats.getAsyncQueuedMsgs();
    long queueSize = stats.getAsyncQueueSize();

    getCache().getLogger().info(addExpected);
    try {
      while (stats.getAsyncQueueSizeExceeded() == 0 && stats.getAsyncQueueTimeouts() == 0) {
        r.put(key, value);
        count++;
        if (stats.getAsyncQueueSize() > 0) {
          queuedMsgs = stats.getAsyncQueuedMsgs();
          queueSize = stats.getAsyncQueueSize();
        }
        if (count > 100) {
          fail("should have exceeded max-queue-size by now");
        }
      }
      getLogWriter()
          .info("After " + count + " " + VALUE_SIZE
              + " byte puts slowrec mode kicked in but the queue filled when its size reached "
              + queueSize + " with " + queuedMsgs + " msgs");
      // make sure we lost a connection to vm0
      WaitCriterion ev = new WaitCriterion() {
        @Override
        public boolean done() {
          return dm.getOtherDistributionManagerIds().size() <= others.size()
              && stats.getAsyncQueueSize() == 0;
        }

        @Override
        public String description() {
          return "waiting for connection loss";
        }
      };
      GeodeAwaitility.await().untilAsserted(ev);
    } finally {
      forceQueueFlush();
      getCache().getLogger().info(removeExpected);
    }
    assertEquals(others, dm.getOtherDistributionManagerIds());
    assertEquals(0, stats.getAsyncQueueSize());
  }

  /**
   * Make sure that exceeding the async-queue-timeout causes a disconnect.
   * <p>
   * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled in build.xml in the
   * splitbrainNov07 branch. It had been disabled since June 2006 due to hangs. Some of the tests,
   * like this one, still need work because the periodically (some quite often) fail.
   */
  @Test
  public void testTimeoutDisconnect() throws Exception {
    final String expected =
        "org.apache.geode.internal.tcp.ConnectionException: Forced disconnect sent to"
            + "||java.io.IOException: Broken pipe";
    final String addExpected = "<ExpectedException action=add>" + expected + "</ExpectedException>";
    final String removeExpected =
        "<ExpectedException action=remove>" + expected + "</ExpectedException>";

    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
    final Region r = createRootRegion("slowrec", factory.create());
    final DistributionManager dm = getSystem().getDistributionManager();
    final DMStats stats = dm.getStats();
    // set others before vm0 connects
    final Set others = dm.getOtherDistributionManagerIds();

    // create receiver in vm0 with queuing enabled
    Properties p = new Properties();
    p.setProperty(ASYNC_DISTRIBUTION_TIMEOUT, "5");
    p.setProperty(ASYNC_QUEUE_TIMEOUT, "500"); // 500 ms
    doCreateOtherVm(p, true);


    final Object key = "key";
    final int VALUE_SIZE = 1024; // 1k
    final byte[] value = new byte[VALUE_SIZE];
    int count = 0;
    long queuedMsgs = stats.getAsyncQueuedMsgs();
    long queueSize = stats.getAsyncQueueSize();
    final long timeoutLimit = System.currentTimeMillis() + 5000;

    getCache().getLogger().info(addExpected);
    try {
      while (stats.getAsyncQueueTimeouts() == 0) {
        r.put(key, value);
        count++;
        if (stats.getAsyncQueueSize() > 0) {
          queuedMsgs = stats.getAsyncQueuedMsgs();
          queueSize = stats.getAsyncQueueSize();
        }
        if (currentTimeMillis() > timeoutLimit) {
          fail("should have exceeded async-queue-timeout by now");
        }
      }
      getLogWriter()
          .info("After " + count + " " + VALUE_SIZE
              + " byte puts slowrec mode kicked in but the queue filled when its size reached "
              + queueSize + " with " + queuedMsgs + " msgs");
      // make sure we lost a connection to vm0
      WaitCriterion ev = new WaitCriterion() {
        @Override
        public boolean done() {
          if (dm.getOtherDistributionManagerIds().size() > others.size()) {
            return false;
          }
          return stats.getAsyncQueueSize() == 0;
        }

        @Override
        public String description() {
          return "waiting for departure";
        }
      };
      GeodeAwaitility.await().untilAsserted(ev);
    } finally {
      getCache().getLogger().info(removeExpected);
    }
    assertEquals(others, dm.getOtherDistributionManagerIds());
    assertEquals(0, stats.getAsyncQueueSize());
  }

  private static final String KEY_SLEEP = "KEY_SLEEP";
  private static final String KEY_WAIT = "KEY_WAIT";
  private static final String KEY_DISCONNECT = "KEY_DISCONNECT";

  protected static final int CALLBACK_CREATE = 0;
  protected static final int CALLBACK_UPDATE = 1;
  protected static final int CALLBACK_INVALIDATE = 2;
  protected static final int CALLBACK_DESTROY = 3;
  protected static final int CALLBACK_REGION_INVALIDATE = 4;

  protected static final Integer CALLBACK_CREATE_INTEGER = new Integer(CALLBACK_CREATE);
  protected static final Integer CALLBACK_UPDATE_INTEGER = new Integer(CALLBACK_UPDATE);
  protected static final Integer CALLBACK_INVALIDATE_INTEGER = new Integer(CALLBACK_INVALIDATE);
  protected static final Integer CALLBACK_DESTROY_INTEGER = new Integer(CALLBACK_DESTROY);
  protected static final Integer CALLBACK_REGION_INVALIDATE_INTEGER =
      new Integer(CALLBACK_REGION_INVALIDATE);

  private static class CallbackWrapper {
    public final Object callbackArgument;
    public final int callbackType;

    public CallbackWrapper(Object callbackArgument, int callbackType) {
      this.callbackArgument = callbackArgument;
      this.callbackType = callbackType;
    }

    public String toString() {
      return "CallbackWrapper: " + callbackArgument.toString() + " of type " + callbackType;
    }
  }

  protected static class ControlListener extends CacheListenerAdapter {
    public final LinkedList callbackArguments = new LinkedList();
    public final LinkedList callbackTypes = new LinkedList();
    public final Object CONTROL_LOCK = new Object();

    @Override
    public void afterCreate(EntryEvent event) {
      LogWriterUtils.getLogWriter()
          .info(event.getRegion().getName() + " afterCreate " + event.getKey());
      synchronized (this.CONTROL_LOCK) {
        if (event.getCallbackArgument() != null) {
          this.callbackArguments
              .add(new CallbackWrapper(event.getCallbackArgument(), CALLBACK_CREATE));
          this.callbackTypes.add(CALLBACK_CREATE_INTEGER);
          this.CONTROL_LOCK.notifyAll();
        }
      }
      processEvent(event);
    }

    @Override
    public void afterUpdate(EntryEvent event) {
      LogWriterUtils.getLogWriter()
          .info(event.getRegion().getName() + " afterUpdate " + event.getKey());
      synchronized (this.CONTROL_LOCK) {
        if (event.getCallbackArgument() != null) {
          this.callbackArguments
              .add(new CallbackWrapper(event.getCallbackArgument(), CALLBACK_UPDATE));
          this.callbackTypes.add(CALLBACK_UPDATE_INTEGER);
          this.CONTROL_LOCK.notifyAll();
        }
      }
      processEvent(event);
    }

    @Override
    public void afterInvalidate(EntryEvent event) {
      synchronized (this.CONTROL_LOCK) {
        if (event.getCallbackArgument() != null) {
          this.callbackArguments
              .add(new CallbackWrapper(event.getCallbackArgument(), CALLBACK_INVALIDATE));
          this.callbackTypes.add(CALLBACK_INVALIDATE_INTEGER);
          this.CONTROL_LOCK.notifyAll();
        }
      }
    }

    @Override
    public void afterDestroy(EntryEvent event) {
      synchronized (this.CONTROL_LOCK) {
        if (event.getCallbackArgument() != null) {
          this.callbackArguments
              .add(new CallbackWrapper(event.getCallbackArgument(), CALLBACK_DESTROY));
          this.callbackTypes.add(CALLBACK_DESTROY_INTEGER);
          this.CONTROL_LOCK.notifyAll();
        }
      }
    }

    @Override
    public void afterRegionInvalidate(RegionEvent event) {
      synchronized (this.CONTROL_LOCK) {
        if (event.getCallbackArgument() != null) {
          this.callbackArguments
              .add(new CallbackWrapper(event.getCallbackArgument(), CALLBACK_REGION_INVALIDATE));
          this.callbackTypes.add(CALLBACK_REGION_INVALIDATE_INTEGER);
          this.CONTROL_LOCK.notifyAll();
        }
      }
    }

    private void processEvent(EntryEvent event) {
      if (event.getKey().equals(KEY_SLEEP)) {
        processSleep(event);
      } else if (event.getKey().equals(KEY_WAIT)) {
        processWait(event);
      } else if (event.getKey().equals(KEY_DISCONNECT)) {
        processDisconnect(event);
      }
    }

    private void processSleep(EntryEvent event) {
      int sleepMs = ((Integer) event.getNewValue()).intValue();
      LogWriterUtils.getLogWriter().info("[processSleep] sleeping for " + sleepMs);
      try {
        Thread.sleep(sleepMs);
      } catch (InterruptedException ignore) {
        fail("interrupted");
      }
    }

    private void processWait(EntryEvent event) {
      int sleepMs = ((Integer) event.getNewValue()).intValue();
      LogWriterUtils.getLogWriter().info("[processWait] waiting for " + sleepMs);
      synchronized (this.CONTROL_LOCK) {
        try {
          this.CONTROL_LOCK.wait(sleepMs);
        } catch (InterruptedException ignore) {
          return;
        }
      }
    }

    private void processDisconnect(EntryEvent event) {
      LogWriterUtils.getLogWriter().info("[processDisconnect] disconnecting");
      disconnectFromDS();
    }
  };

  /**
   * Make sure a multiple no ack regions conflate properly. [bruce] disabled when use of this dunit
   * test class was reenabled in the splitbrainNov07 branch. The class had been disabled since June
   * 2006 r13222 in the trunk. This test is failing because conflation isn't kicking in for some
   * reason.
   */
  @Test
  public void testMultipleRegionConflation() throws Exception {
    try {
      doTestMultipleRegionConflation();
    } finally {
      // make sure other vm was notified even if test failed
      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
        @Override
        public void run() {
          synchronized (doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
            doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
          }
        }
      });
    }
  }

  protected static ControlListener doTestMultipleRegionConflation_R1_Listener;
  protected static ControlListener doTestMultipleRegionConflation_R2_Listener;

  private void doTestMultipleRegionConflation() throws Exception {
    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
    factory.setEnableAsyncConflation(true);
    final Region r1 = createRootRegion("slowrec1", factory.create());
    final Region r2 = createRootRegion("slowrec2", factory.create());

    assertTrue(getSystem().isConnected());
    assertNotNull(r1);
    assertFalse(r1.isDestroyed());
    assertNotNull(getCache());
    assertNotNull(getCache().getRegion("slowrec1"));
    assertNotNull(r2);
    assertFalse(r2.isDestroyed());
    assertNotNull(getCache());
    assertNotNull(getCache().getRegion("slowrec2"));

    final DistributionManager dm = getSystem().getDistributionManager();
    final Serializable controllerVM = dm.getDistributionManagerId();
    final DMStats stats = dm.getStats();
    final int millisToWait = 1000 * 60 * 5; // 5 minutes

    // set others before vm0 connects
    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();

    // create receiver in vm0 with queuing enabled
    final Properties p = new Properties();
    p.setProperty(ASYNC_DISTRIBUTION_TIMEOUT, "5");
    p.setProperty(ASYNC_QUEUE_TIMEOUT, "86400000"); // max value
    p.setProperty(ASYNC_MAX_QUEUE_SIZE, "1024"); // max value

    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
      @Override
      public void run2() throws CacheException {
        getSystem(p);

        DistributionManager dm = getSystem().getDistributionManager();
        assertTrue(dm.getDistributionManagerIds().contains(controllerVM));

        AttributesFactory af = new AttributesFactory();
        af.setScope(Scope.DISTRIBUTED_NO_ACK);
        af.setDataPolicy(DataPolicy.REPLICATE);

        doTestMultipleRegionConflation_R1_Listener = new ControlListener();
        af.setCacheListener(doTestMultipleRegionConflation_R1_Listener);
        createRootRegion("slowrec1", af.create());

        doTestMultipleRegionConflation_R2_Listener = new ControlListener();
        af.setCacheListener(doTestMultipleRegionConflation_R2_Listener);
        createRootRegion("slowrec2", af.create());
      }
    });

    // put vm0 cache listener into wait
    LogWriterUtils.getLogWriter()
        .info("[doTestMultipleRegionConflation] about to put vm0 into wait");
    r1.put(KEY_WAIT, new Integer(millisToWait));

    // build up queue size
    LogWriterUtils.getLogWriter()
        .info("[doTestMultipleRegionConflation] building up queue size...");
    final Object key = "key";
    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
    final int VALUE_SIZE = socketBufferSize * 3;
    // final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
    final byte[] value = new byte[VALUE_SIZE];

    int count = 0;
    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
      count++;
      r1.put(key, value);
    }

    LogWriterUtils.getLogWriter()
        .info("[doTestMultipleRegionConflation] After " + count + " puts of size " + VALUE_SIZE
            + " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());

    // put values that will be asserted
    final Object key1 = "key1";
    final Object key2 = "key2";
    Object putKey = key1;
    boolean flag = true;
    for (int i = 0; i < 30; i++) {
      if (i == 10)
        putKey = key2;
      if (flag) {
        if (i == 6) {
          r1.invalidate(putKey, new Integer(i));
        } else if (i == 24) {
          r1.invalidateRegion(new Integer(i));
        } else {
          r1.put(putKey, value, new Integer(i));
        }
      } else {
        if (i == 15) {
          r2.destroy(putKey, new Integer(i));
        } else {
          r2.put(putKey, value, new Integer(i));
        }
      }
      flag = !flag;
    }

    // r1: key1, 0, create
    // r1: key1, 4, update
    // r1: key1, 6, invalidate
    // r1: key1, 8, update

    // r1: key2, 10, create
    // r1: 24, invalidateRegion
    // r1: key2, 28, update

    // r2: key1, 1, create
    // r2: key1, 9, update

    // r2: key2, 11, create
    // r2: key2, 13, update
    // r2: key2, 15, destroy
    // r2: key2, 17, create
    // r2: key2, 29, update

    final int[] r1ExpectedArgs = new int[] {0, 4, 6, 8, 10, 24, 28};
    final int[] r1ExpectedTypes = new int[] /* 0, 1, 2, 1, 0, 4, 1 */
    {CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_INVALIDATE, CALLBACK_UPDATE, CALLBACK_CREATE,
        CALLBACK_REGION_INVALIDATE, CALLBACK_UPDATE};

    final int[] r2ExpectedArgs = new int[] {1, 9, 11, 13, 15, 17, 29};
    final int[] r2ExpectedTypes = new int[] {CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_CREATE,
        CALLBACK_UPDATE, CALLBACK_DESTROY, CALLBACK_CREATE, CALLBACK_UPDATE};

    // send notify to vm0
    LogWriterUtils.getLogWriter().info("[doTestMultipleRegionConflation] wake up vm0");
    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
      @Override
      public void run() {
        synchronized (doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
          doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
        }
      }
    });

    // wait for queue to be flushed
    LogWriterUtils.getLogWriter().info("[doTestMultipleRegionConflation] wait for vm0");
    getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
      @Override
      public void run() {
        try {
          synchronized (doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
            while (doTestMultipleRegionConflation_R1_Listener.callbackArguments
                .size() < r1ExpectedArgs.length) {
              doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.wait(millisToWait);
            }
          }
          synchronized (doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
            while (doTestMultipleRegionConflation_R2_Listener.callbackArguments
                .size() < r2ExpectedArgs.length) {
              doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK.wait(millisToWait);
            }
          }
        } catch (InterruptedException ignore) {
          fail("interrupted");
        }
      }
    });

    // assert values on both listeners
    LogWriterUtils.getLogWriter()
        .info("[doTestMultipleRegionConflation] assert callback arguments");
    getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
      @Override
      public void run() {
        synchronized (doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
          LogWriterUtils.getLogWriter()
              .info("doTestMultipleRegionConflation_R1_Listener.callbackArguments="
                  + doTestMultipleRegionConflation_R1_Listener.callbackArguments);
          LogWriterUtils.getLogWriter()
              .info("doTestMultipleRegionConflation_R1_Listener.callbackTypes="
                  + doTestMultipleRegionConflation_R1_Listener.callbackTypes);
          assertEquals(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size(),
              doTestMultipleRegionConflation_R1_Listener.callbackTypes.size());
          int i = 0;
          for (Iterator iter =
              doTestMultipleRegionConflation_R1_Listener.callbackArguments.iterator(); iter
                  .hasNext();) {
            CallbackWrapper wrapper = (CallbackWrapper) iter.next();
            assertEquals(new Integer(r1ExpectedArgs[i]), wrapper.callbackArgument);
            assertEquals(new Integer(r1ExpectedTypes[i]),
                doTestMultipleRegionConflation_R1_Listener.callbackTypes.get(i));
            i++;
          }
        }
        synchronized (doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
          LogWriterUtils.getLogWriter()
              .info("doTestMultipleRegionConflation_R2_Listener.callbackArguments="
                  + doTestMultipleRegionConflation_R2_Listener.callbackArguments);
          LogWriterUtils.getLogWriter()
              .info("doTestMultipleRegionConflation_R2_Listener.callbackTypes="
                  + doTestMultipleRegionConflation_R2_Listener.callbackTypes);
          assertEquals(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size(),
              doTestMultipleRegionConflation_R2_Listener.callbackTypes.size());
          int i = 0;
          for (Iterator iter =
              doTestMultipleRegionConflation_R2_Listener.callbackArguments.iterator(); iter
                  .hasNext();) {
            CallbackWrapper wrapper = (CallbackWrapper) iter.next();
            assertEquals(new Integer(r2ExpectedArgs[i]), wrapper.callbackArgument);
            assertEquals(new Integer(r2ExpectedTypes[i]),
                doTestMultipleRegionConflation_R2_Listener.callbackTypes.get(i));
            i++;
          }
        }
      }
    });
  }

  /**
   * Make sure a disconnect causes queue memory to be released.
   */
  @Test
  public void testDisconnectCleanup() throws Exception {
    try {
      doTestDisconnectCleanup();
    } finally {
      // make sure other vm was notified even if test failed
      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
        @Override
        public void run() {
          synchronized (doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
            doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
          }
        }
      });
    }
  }

  protected static ControlListener doTestDisconnectCleanup_Listener;

  private void doTestDisconnectCleanup() throws Exception {
    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(DISTRIBUTED_NO_ACK);
    final Region r = createRootRegion("slowrec", factory.create());
    final DistributionManager dm = getSystem().getDistributionManager();
    final DMStats stats = dm.getStats();
    // set others before vm0 connects
    final Set others = dm.getOtherDistributionManagerIds();
    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
    final int initialQueues = stats.getAsyncQueues();

    // create receiver in vm0 with queuing enabled
    final Properties p = new Properties();
    p.setProperty(ASYNC_DISTRIBUTION_TIMEOUT, "5");
    p.setProperty(ASYNC_QUEUE_TIMEOUT, "86400000"); // max value
    p.setProperty(ASYNC_MAX_QUEUE_SIZE, "1024"); // max value

    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
      @Override
      public void run2() throws CacheException {
        getSystem(p);
        AttributesFactory af = new AttributesFactory();
        af.setScope(DISTRIBUTED_NO_ACK);
        af.setDataPolicy(REPLICATE);

        doTestDisconnectCleanup_Listener = new ControlListener();
        af.setCacheListener(doTestDisconnectCleanup_Listener);
        createRootRegion("slowrec", af.create());
      }
    });

    // put vm0 cache listener into wait
    getLogWriter().info("[testDisconnectCleanup] about to put vm0 into wait");
    int millisToWait = 1000 * 60 * 5; // 5 minutes
    r.put(KEY_WAIT, new Integer(millisToWait));
    r.put(KEY_DISCONNECT, KEY_DISCONNECT);

    // build up queue size
    getLogWriter().info("[testDisconnectCleanup] building up queue size...");
    final Object key = "key";
    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
    final int VALUE_SIZE = socketBufferSize * 3;
    // final int VALUE_SIZE = 1024 * 1024 ; // 1 MB
    final byte[] value = new byte[VALUE_SIZE];

    int count = 0;
    final long abortMillis = currentTimeMillis() + millisToWait;
    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
      count++;
      r.put(key, value);
      assertFalse(currentTimeMillis() >= abortMillis);
    }

    getLogWriter().info("[testDisconnectCleanup] After " + count + " puts of size "
        + VALUE_SIZE + " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());

    while (stats.getAsyncQueuedMsgs() < 10 || stats.getAsyncQueueSize() < VALUE_SIZE * 10) {
      count++;
      r.put(key, value);
      assertFalse(currentTimeMillis() >= abortMillis);
    }
    assertTrue(stats.getAsyncQueuedMsgs() >= 10);

    while (stats.getAsyncQueues() < 1) {
      pause(100);
      assertFalse(currentTimeMillis() >= abortMillis);
    }

    getLogWriter()
        .info("[testDisconnectCleanup] After " + count + " puts of size " + VALUE_SIZE
            + " queue size has reached " + stats.getAsyncQueueSize()
            + " bytes and number of queues is " + stats.getAsyncQueues() + ".");

    assertTrue(stats.getAsyncQueueSize() >= (VALUE_SIZE * 5));
    assertEquals(initialQueues + 1, stats.getAsyncQueues());

    // assert vm0 is still connected
    assertTrue(dm.getOtherDistributionManagerIds().size() > others.size());

    // send notify to vm0
    getLogWriter().info("[testDisconnectCleanup] wake up vm0");
    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
      @Override
      public void run() {
        synchronized (doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
          doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
        }
      }
    });

    // make sure we lost a connection to vm0
    getLogWriter().info("[testDisconnectCleanup] wait for vm0 to disconnect");
    WaitCriterion ev = new WaitCriterion() {
      @Override
      public boolean done() {
        return dm.getOtherDistributionManagerIds().size() <= others.size();
      }

      @Override
      public String description() {
        return "waiting for disconnect";
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);
    assertEquals(others, dm.getOtherDistributionManagerIds());

    // check free memory... perform wait loop with System.gc
    getLogWriter().info("[testDisconnectCleanup] wait for queue cleanup");
    ev = new WaitCriterion() {
      @Override
      public boolean done() {
        if (stats.getAsyncQueues() <= initialQueues) {
          return true;
        }
        getRuntime().gc();
        return false;
      }

      @Override
      public String description() {
        return "waiting for queue cleanup";
      }
    };
    GeodeAwaitility.await().untilAsserted(ev);
    assertEquals(initialQueues, stats.getAsyncQueues());
  }

  /**
   * Make sure a disconnect causes queue memory to be released.
   * <p>
   * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled in build.xml in the
   * splitbrainNov07 branch. It had been disabled since June 2006 due to hangs. Some of the tests,
   * like this one, still need work because the periodically (some quite often) fail.
   */
  @Test
  public void testPartialMessage() throws Exception {
    try {
      doTestPartialMessage();
    } finally {
      // make sure other vm was notified even if test failed
      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
        @Override
        public void run() {
          synchronized (doTestPartialMessage_Listener.CONTROL_LOCK) {
            doTestPartialMessage_Listener.CONTROL_LOCK.notifyAll();
          }
        }
      });
    }
  }

  protected static ControlListener doTestPartialMessage_Listener;

  private void doTestPartialMessage() throws Exception {
    final AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
    factory.setEnableAsyncConflation(true);
    final Region r = createRootRegion("slowrec", factory.create());
    final DistributionManager dm = getSystem().getDistributionManager();
    final DMStats stats = dm.getStats();

    // set others before vm0 connects
    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();

    // create receiver in vm0 with queuing enabled
    final Properties p = new Properties();
    p.setProperty(ASYNC_DISTRIBUTION_TIMEOUT, String.valueOf(1000 * 4)); // 4 sec
    p.setProperty(ASYNC_QUEUE_TIMEOUT, "86400000"); // max value
    p.setProperty(ASYNC_MAX_QUEUE_SIZE, "1024"); // max value

    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
      @Override
      public void run2() throws CacheException {
        getSystem(p);
        AttributesFactory af = new AttributesFactory();
        af.setScope(Scope.DISTRIBUTED_NO_ACK);
        af.setDataPolicy(DataPolicy.REPLICATE);

        doTestPartialMessage_Listener = new ControlListener();
        af.setCacheListener(doTestPartialMessage_Listener);
        createRootRegion("slowrec", af.create());
      }
    });

    // put vm0 cache listener into wait
    LogWriterUtils.getLogWriter().info("[testPartialMessage] about to put vm0 into wait");
    final int millisToWait = 1000 * 60 * 5; // 5 minutes
    r.put(KEY_WAIT, new Integer(millisToWait));

    // build up queue size
    LogWriterUtils.getLogWriter().info("[testPartialMessage] building up queue size...");
    final Object key = "key";
    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
    final int VALUE_SIZE = socketBufferSize * 3;
    // 1024 * 20; // 20 KB
    final byte[] value = new byte[VALUE_SIZE];

    int count = 0;
    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
      count++;
      r.put(key, value, new Integer(count));
    }

    final int partialId = count;
    assertEquals(0, stats.getAsyncConflatedMsgs());

    LogWriterUtils.getLogWriter().info("[testPartialMessage] After " + count + " puts of size "
        + VALUE_SIZE + " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());

    Wait.pause(2000);

    // conflate 10 times
    while (stats.getAsyncConflatedMsgs() < 10) {
      count++;
      r.put(key, value, new Integer(count));
      if (count == partialId + 1) {
        assertEquals(initialQueuedMsgs + 2, stats.getAsyncQueuedMsgs());
        assertEquals(0, stats.getAsyncConflatedMsgs());
      } else if (count == partialId + 2) {
        assertEquals(initialQueuedMsgs + 2, stats.getAsyncQueuedMsgs());
        assertEquals(1, stats.getAsyncConflatedMsgs());
      }
    }

    final int conflateId = count;

    final int[] expectedArgs = {partialId, conflateId};

    // send notify to vm0
    LogWriterUtils.getLogWriter().info("[testPartialMessage] wake up vm0");
    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
      @Override
      public void run() {
        synchronized (doTestPartialMessage_Listener.CONTROL_LOCK) {
          doTestPartialMessage_Listener.CONTROL_LOCK.notify();
        }
      }
    });

    // wait for queue to be flushed
    LogWriterUtils.getLogWriter().info("[testPartialMessage] wait for vm0");
    getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
      @Override
      public void run() {
        try {
          synchronized (doTestPartialMessage_Listener.CONTROL_LOCK) {
            boolean done = false;
            while (!done) {
              if (doTestPartialMessage_Listener.callbackArguments.size() > 0) {
                CallbackWrapper last =
                    (CallbackWrapper) doTestPartialMessage_Listener.callbackArguments.getLast();
                Integer lastId = (Integer) last.callbackArgument;
                if (lastId.intValue() == conflateId) {
                  done = true;
                } else {
                  doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
                }
              } else {
                doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
              }
            }
          }
        } catch (InterruptedException ignore) {
          fail("interrupted");
        }
      }
    });

    // assert values on both listeners
    LogWriterUtils.getLogWriter().info("[testPartialMessage] assert callback arguments");
    getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
      @Override
      public void run() {
        synchronized (doTestPartialMessage_Listener.CONTROL_LOCK) {
          LogWriterUtils.getLogWriter()
              .info("[testPartialMessage] " + "doTestPartialMessage_Listener.callbackArguments="
                  + doTestPartialMessage_Listener.callbackArguments);

          assertEquals(doTestPartialMessage_Listener.callbackArguments.size(),
              doTestPartialMessage_Listener.callbackTypes.size());

          int i = 0;
          Iterator argIter = doTestPartialMessage_Listener.callbackArguments.iterator();
          Iterator typeIter = doTestPartialMessage_Listener.callbackTypes.iterator();

          while (argIter.hasNext()) {
            CallbackWrapper wrapper = (CallbackWrapper) argIter.next();
            Integer arg = (Integer) wrapper.callbackArgument;
            typeIter.next(); // Integer type
            if (arg.intValue() < partialId) {
              continue;
            }
            assertEquals(new Integer(expectedArgs[i]), arg);
            // assertIndexDetailsEquals(CALLBACK_UPDATE_INTEGER, type);
            i++;
          }
        }
      }
    });

  }
}
